-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathfacsvatar_ops.py
executable file
·343 lines (283 loc) · 16.4 KB
/
facsvatar_ops.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# ##### BEGIN GPL LICENSE BLOCK #####
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# ##### END GPL LICENSE BLOCK #####
# Copyright (c) Stef van der Struijk <stefstruijk@protonmail.ch>
import bpy
import sys
import subprocess # use Python executable (for pip usage)
from pathlib import Path # Object-oriented filesystem paths since Python 3.4
import json
class SOCKET_OT_connect_subscriber(bpy.types.Operator):
"""Manages the binding of a subscriber ZeroMQ socket and processing the received data"""
# Use this as a tooltip for menu items and buttons.
bl_idname = "socket.connect_subscriber" # Unique identifier for buttons and menu items to reference.
bl_label = "Connect socket" # Display name in the interface.
bl_options = {'REGISTER', 'UNDO'} # Enable undo for the operator; UNTESTED
statetest = "Nothing yet..."
def execute(self, context): # execute() is called when running the operator.
"""Either sets-up a ZeroMQ subscriber socket and make timed_msg_poller active,
or turns-off the timed function and shuts-down the socket."""
# if this operator can be triggered thought an interface button, pyzmq has been installed
import zmq
# get access to our Properties defined in BlendzmqPreferences() (__init__.py)
preferences = context.preferences.addons[__package__].preferences
# get access to our Properties in FACSvatarProperties() (blendzmq_props.py)
self.socket_settings = context.window_manager.socket_settings
# connect our socket if it wasn't and call Blender's timer function on self.timed_msg_poller
if not self.socket_settings.socket_connected:
# scene info
self.frame_start = context.scene.frame_current
self.report({'INFO'}, "Connecting ZeroMQ socket...")
# create a ZeroMQ context
self.zmq_ctx = zmq.Context().instance()
# connect to ip and port specified in interface (blendzmq_panel.py)
self.url = f"tcp://{preferences.socket_ip}:{preferences.socket_port}"
# store our connection in Blender's WindowManager for access in self.timed_msg_poller()
bpy.types.WindowManager.socket_sub = self.zmq_ctx.socket(zmq.SUB)
bpy.types.WindowManager.socket_sub.connect(self.url) # publisher connects to this (subscriber)
bpy.types.WindowManager.socket_sub.setsockopt(zmq.SUBSCRIBE, ''.encode('ascii'))
self.report({'INFO'}, "Sub connected to: {}\nWaiting for data...".format(self.url))
# poller socket for checking server replies (synchronous - not sure how to use async with Blender)
self.poller = zmq.Poller()
self.poller.register(bpy.types.WindowManager.socket_sub, zmq.POLLIN)
# let Blender know our socket is connected
self.socket_settings.socket_connected = True
# reference to selected objects at start of data stream;
# a copy is made, because this is a pointer (which is updated when another object is selected)
self.selected_objs = bpy.context.scene.view_layers[0].objects.selected.items().copy() # .active
# have Blender call our data listening function in the background
bpy.app.timers.register(self.timed_msg_poller)
# bpy.app.timers.register(partial(self.timed_msg_poller, context))
# stop ZMQ poller timer and disconnect ZMQ socket
else:
print(self.statetest)
# cancel timer function with poller if active
if bpy.app.timers.is_registered(self.timed_msg_poller):
bpy.app.timers.unregister(self.timed_msg_poller())
# Blender's property socket_connected might say connected, but it might actually be not;
# e.g. on Add-on reload
try:
# close connection
bpy.types.WindowManager.socket_sub.close()
self.report({'INFO'}, "Subscriber socket closed")
except AttributeError:
self.report({'INFO'}, "Subscriber was socket not active")
# let Blender know our socket is disconnected
bpy.types.WindowManager.socket_sub = None
self.socket_settings.socket_connected = False
return {'FINISHED'} # Lets Blender know the operator finished successfully.
def timed_msg_poller(self): # context
"""Keeps listening to integer values and uses that to move (previously) selected objects"""
socket_sub = bpy.types.WindowManager.socket_sub
# only keep running if socket reference exist (not None)
if socket_sub:
# get sockets with messages (0: don't wait for msgs)
sockets = dict(self.poller.poll(0))
# check if our sub socket has a message
if socket_sub in sockets:
# get the message
topic, timestamp, msg = socket_sub.recv_multipart()
# print("On topic {}, received data: {}".format(topic, msg))
# turn bytes to json string
msg = msg.decode('utf-8')
# context stays the same as when started?
self.socket_settings.msg_received = msg
# check if we didn't receive None (final message)
if msg:
# turn json string to dict
msg = json.loads(msg)
# update selected obj only if property `dynamic_object` is on (blendzmq_props.py)
if self.socket_settings.dynamic_object:
# only active object (no need for a copy)
# self.selected_obj = bpy.context.scene.view_layers[0].objects.active
# collections work with pointers and doesn't keep the old reference, therefore we need a copy
self.selected_objs = bpy.context.scene.view_layers[0].objects.selected.items().copy()
# if we only wanted to update the active object with `.objects.active`
# self.selected_obj.location.x = move_val
# move all (previously) selected objects' x coordinate to move_val
for obj in self.selected_objs:
# TODO check if FACS compatible model
try:
insert_frame = self.frame_start + msg['frame']
# set blendshapes only if blendshape data is available and not empty
if self.socket_settings.facial_configuration and 'blendshapes' in msg and msg['blendshapes']:
# obj[1] == bpy.data.objects['mb_model']
self.set_blendshapes(obj[1], msg['blendshapes'], insert_frame)
else:
self.report({'INFO'}, "No blendshape data found in received msg")
# set pose only if bone rotation is on, pose data is available and not empty
if self.socket_settings.rotate_head and 'pose' in msg and msg['pose']:
# obj[1] == bpy.data.objects['mb_model']
self.set_head_neck_pose(obj[1], msg['pose'], insert_frame)
else:
self.report({'INFO'}, "No pose data found in received msg")
except:
self.report({'WARNING'}, "Object likely not a support model")
else:
self.socket_settings.msg_received = "Last message received."
# keep running and check every 0.1 millisecond for new ZeroMQ messages
return 0.001
# no return stops the timer to this function
def set_blendshapes(self, obj, blendshape_data, insert_frame):
# set all shape keys values
# bpy.context.scene.objects.active = self.mb_body
for bs in blendshape_data:
# skip setting shape keys for breathing from data
if not bs.startswith("Expressions_chestExpansion"):
# print(bs)
# MB fix Caucasian female
# if not bs == "Expressions_eyeClosedR_max":
val = blendshape_data[bs]
# obj[bs].value = val
obj.data.shape_keys.key_blocks[bs].value = val
# save as key frames if enabled
if self.socket_settings.keyframing:
obj.data.shape_keys.key_blocks[bs] \
.keyframe_insert(data_path="value", frame=insert_frame)
# TODO make faster (with quaternions?)
def set_head_neck_pose(self, obj, pose_data, insert_frame):
# get head and neck bone for rotation
head_bones = [obj.parent.pose.bones['head'], obj.parent.pose.bones['neck']]
for bone in head_bones:
# https://blender.stackexchange.com/questions/28159/how-to-rotate-a-bone-using-python
# Set rotation mode to Euler XYZ, easier to understand than default quaternions
bone.rotation_mode = 'XYZ'
if self.socket_settings.mirror_head:
mirror_head = -1
else:
mirror_head = 1
# in case we filter data
if 'pose_Rx' in pose_data:
self.rotate_head_bones(head_bones, 0, pose_data['pose_Rx'], 1) # pitch
if 'pose_Ry' in pose_data:
self.rotate_head_bones(head_bones, 1, pose_data['pose_Ry'], -1 * mirror_head) # jaw
if 'pose_Rz' in pose_data:
self.rotate_head_bones(head_bones, 2, pose_data['pose_Rz'], -1 * mirror_head) # roll
# save as key frames if enabled
if self.socket_settings.keyframing:
head_bones[0].keyframe_insert(data_path="rotation_euler", frame=insert_frame)
head_bones[1].keyframe_insert(data_path="rotation_euler", frame=insert_frame)
# match head pose name with bones in blender
def rotate_head_bones(self, head_bones, xyz, pose, inv=1):
# print(f"Rotate value: {pose}")
# head bone
# print(head_bones[0].rotation_euler[xyz])
head_bones[0].rotation_euler[xyz] = pose * .95 * inv
# print(head_bones[0].rotation_euler[xyz])
# neck bone
head_bones[1].rotation_euler[xyz] = pose * .5 * inv
class PIPZMQ_OT_pip_pyzmq(bpy.types.Operator):
"""Enables and updates pip, and installs pyzmq""" # Use this as a tooltip for menu items and buttons.
bl_idname = "pipzmq.pip_pyzmq" # Unique identifier for buttons and menu items to reference.
bl_label = "Enable pip & install pyzmq" # Display name in the interface.
bl_options = {'REGISTER'}
def execute(self, context): # execute() is called when running the operator.
install_props = context.window_manager.install_props
# enable/import pip
py_exec = self.ensure_pip(install_props=install_props)
# update pip to latest version; not necessary anymore (tested 2.93 LTS & 3.3 LTS)
if bpy.app.version[0] == 2 and bpy.app.version[1] < 91:
self.update_pip(install_props=install_props, py_exec=py_exec)
# install PyZMQ
self.install_pyzmq(install_props=install_props, py_exec=py_exec)
return {'FINISHED'} # Lets Blender know the operator finished successfully
def ensure_pip(self, install_props):
# TODO check permission rights
# TODO Windows ask for permission:
# https://stackoverflow.com/questions/130763/request-uac-elevation-from-within-a-python-script
# TODO pip install with `--user` to prevent problems on Windows?
# https://stackoverflow.com/questions/11161901/how-to-install-python-modules-in-blender
# pip in Blender:
# https://blender.stackexchange.com/questions/139718/install-pip-and-packages-from-within-blender-os-independently/
# pip 2.81 issues: https://developer.blender.org/T71856
# no pip enabled by default version < 2.81
install_props.install_status = "Preparing to enable pip..."
self.report({'INFO'}, "Preparing to enable pip...")
if bpy.app.version[0] == 2 and bpy.app.version[1] < 81:
# find python binary OS independent (Windows: bin\python.exe; Linux: bin/python3.7m)
py_path = Path(sys.prefix) / "bin"
py_exec = str(next(py_path.glob("python*"))) # first file that starts with "python" in "bin" dir
if subprocess.call([py_exec, "-m", "ensurepip"]) != 0:
install_props.install_status += "\nCouldn't activate pip."
self.report({'ERROR'}, "Couldn't activate pip.")
return {'CANCELLED'}
# from 2.81 pip is enabled by default
else:
try:
# will likely fail the first time, but works after `ensurepip.bootstrap()` has been called once
import pip
install_props.install_status += "\nPip imported!"
self.report({'INFO'}, "Pip imported!")
# pip not enabled
except ModuleNotFoundError as e:
# only first attempt will reach here
print("Pip import failed with: ", e)
install_props.install_status += "\nPip not activated, trying bootstrap()"
self.report({'ERROR'}, "Pip not activated, trying bootstrap()")
try:
import ensurepip
ensurepip.bootstrap()
except: # catch *all* exceptions
e = sys.exc_info()[0]
install_props.install_status += "\nPip not activated, trying bootstrap()"
self.report({'ERROR'}, "Pip not activated, trying bootstrap()")
print("bootstrap failed with: ", e)
install_props.install_status += "\nPip activated!"
self.report({'INFO'}, "Pip activated!")
# 2.81 >= Blender < 2.91
if bpy.app.version[0] == 2 and bpy.app.version[1] < 91:
py_exec = bpy.app.binary_path_python
# (tested on 2.93 LTS & 3.3 LTS) Blender >= 2.91
else:
py_exec = sys.executable
return py_exec
def update_pip(self, install_props, py_exec):
install_props.install_status += "\nTrying pip upgrade..."
self.report({'INFO'}, "Trying pip upgrade...")
# pip update
try:
output = subprocess.check_output([py_exec, '-m', 'pip', 'install', '--upgrade', 'pip'])
print(output)
except subprocess.CalledProcessError as e:
install_props.install_status += "\nCouldn't update pip. Please restart Blender and try again."
self.report({'ERROR'}, "Couldn't update pip. Please restart Blender and try again.")
print(e.output)
return {'CANCELLED'}
install_props.install_status += "\nPip working!"
self.report({'INFO'}, "Pip working!")
def install_pyzmq(self, install_props, py_exec):
install_props.install_status += "\nTrying pyzmq install..."
self.report({'INFO'}, "Trying pyzmq install...")
# pyzmq pip install
try:
output = subprocess.check_output([py_exec, '-m', 'pip', 'install', 'pyzmq==24.0.*'])
print(output)
except subprocess.CalledProcessError as e:
install_props.install_status += "\nCouldn't install pyzmq."
self.report({'ERROR'}, "Couldn't install pyzmq.")
print(e.output)
return {'CANCELLED'}
install_props.install_status += "\npyzmq installed! READY!"
self.report({'INFO'}, "pyzmq installed! READY!")
def register():
bpy.utils.register_class(PIPZMQ_OT_pip_pyzmq)
bpy.utils.register_class(SOCKET_OT_connect_subscriber)
def unregister():
bpy.utils.unregister_class(SOCKET_OT_connect_subscriber)
bpy.utils.register_class(PIPZMQ_OT_pip_pyzmq)
if __name__ == "__main__":
register()