-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsmartnet.py
425 lines (338 loc) · 13.5 KB
/
smartnet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
#!/usr/bin/python
import socket
from threading import Timer,Thread
from time import time, sleep
def shift_this(number, high_first=True):
"""Utility method: extracts MSB and LSB from number.
Args:
number - number to shift
high_first - MSB or LSB first (true / false)
Returns:
(high, low) - tuple with shifted values
"""
low = (number & 0xFF)
high = ((number >> 8) & 0xFF)
if high_first:
return((high, low))
return((low, high))
def put_in_range(number, range_min, range_max, make_even=True):
"""Utility method: sets number in defined range.
DEPRECATED: this will be removed from the library"""
number = max(range_min, min(number, range_max))
if make_even:
if number % 2 != 0:
number += 1
return number
def make_address_mask(universe, sub=0, net=0, is_simplified=True):
"""Returns the address bytes for a given universe, subnet and net.
Args:
universe - Universe to listen
sub - Subnet to listen
net - Net to listen
is_simplified - Whether to use nets and subnet or universe only,
see User Guide page 5 (Universe Addressing)
Returns:
bytes - byte mask for given address
"""
def clamp(number, min_val, max_val):
return max(min_val, min(number, max_val))
def shift_this(number, high_first=True):
low = (number & 0xFF)
high = ((number >> 8) & 0xFF)
if high_first:
return((high, low))
return((low, high))
address_mask = bytearray()
if is_simplified:
# Ensure data is in right range
universe = clamp(universe, 0, 32767)
# Make mask
msb, lsb = shift_this(universe) # convert to MSB / LSB
address_mask.append(lsb)
address_mask.append(msb)
else:
# Ensure data is in right range
universe = clamp(universe, 0, 15)
sub = clamp(sub, 0, 15)
net = clamp(net, 0, 127)
# Make mask
address_mask.append(sub << 4 | universe)
address_mask.append(net & 0xFF)
return address_mask
class Smartnet():
"""(Very) simple implementation of Artnet."""
UDP_PORT = 6454
def __init__(self, target_ip='127.0.0.1', universes: list = [0],fps=40, broadcast=False):
"""Initializes Art-Net Client.
Args:
targetIP - IP of receiving device
universes - universes to listen
fps - transmition rate
broadcast - whether to broadcast in local sub
Returns:
None
"""
# Instance variables
self.target_ip = target_ip
self.sequence = 0
self.subnet = 0
self.net = 0
self.header_list = list() # contains a header with every universe
# UDP SOCKET
self.socket_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if broadcast:
self.socket_client.setsockopt(
socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
# Timer
self.fps = fps
self.__clock = None
#make set of headers for every universe
universes.sort()
i = 0
for u in universes:
if i == u:
self.header_list.append(self.make_header_no_packetsize(self.net, self.subnet, u))
i += 1
else:
while(i != u): # in case universes are skipped, so index matches with universe
self.header_list.append(None)
i += 1
self.header_list.append(self.make_header_no_packetsize(self.net, self.subnet, u))
i += 1
def __del__(self):
"""Graceful shutdown."""
self.stop()
self.close()
def __str__(self):
"""Printable object state."""
state = "===================================\n"
state += "Stupid Artnet initialized\n"
state += f"Target IP: {self.target_ip} : {self.UDP_PORT} \n"
state += f"Universe: {self.universe} \n"
if not self.is_simplified:
state += f"Subnet: {self.subnet} \n"
state += f"Net: {self.net} \n"
state += f"Packet Size: {self.packet_size} \n"
state += "==================================="
return state
def make_header_no_packetsize(self, net: int, subnet: int, universe: int):
"""Creates Header to save in set and add packetsize dynamically."""
# 0 - id (7 x bytes + Null)
tmp = bytearray()
tmp.extend(bytearray('Art-Net', 'utf8'))
tmp.append(0x0)
# 8 - opcode (2 x 8 low byte first)
tmp.append(0x00)
tmp.append(0x50) # ArtDmx data packet
# 10 - prototocol version (2 x 8 high byte first)
tmp.append(0x0)
tmp.append(14)
# 12 - sequence (int 8), NULL for not implemented
tmp.append(self.sequence)
# 13 - physical port (int 8)
tmp.append(0x00)
# 14 - universe, (2 x 8 low byte first)
# as specified in Artnet 4 (remember to set the value manually after):
# Bit 3 - 0 = Universe (1-16)
# Bit 7 - 4 = Subnet (1-16)
# Bit 14 - 8 = Net (1-128)
# Bit 15 = 0
# this means 16 * 16 * 128 = 32768 universes per port
# a subnet is a group of 16 Universes
# 16 subnets will make a net, there are 128 of them
tmp.append(subnet << 4 | universe)
tmp.append(net & 0xFF)
return tmp
def __make_packetsize_byte(self, packet_size):
# 16 - packet size (2 x 8 high byte first)
#packet_size = put_in_range(packet_size, 2, 512, self.make_even)
psize = bytearray()
msb, lsb = shift_this(packet_size) # convert to MSB / LSB
psize.append(msb)
psize.append(lsb)
return psize
def send_data(self, data: bytearray, universe: int):
"""Finally send data."""
packet = self.header_list[universe] + self.__make_packetsize_byte(len(data))
packet.extend(data)
try:
self.socket_client.sendto(packet, (self.target_ip, self.UDP_PORT))
except socket.error as error:
print(f"ERROR: Socket error with exception: {error}")
finally:
self.sequence = (self.sequence + 1) % 256
def close(self):
"""Close UDP socket."""
self.socket_client.close()
# THREADING #
def start(self):
"""Starts thread clock."""
self.show()
self.__clock = Timer((1000.0 / self.fps) / 1000.0, self.start)
self.__clock.daemon = True
self.__clock.start()
def stop(self):
"""Stops thread clock."""
if self.__clock is not None:
self.__clock.cancel()
# SETTERS - DATA #
def clear(self):
"""Clear DMX buffer."""
self.buffer = bytearray(self.packet_size)
# AUX Function #
def send(self, packet):
"""Set buffer and send straightaway.
Args:
array - integer array to send
"""
self.set(packet)
self.show()
class SmartNetServer():
"""(Very) simple implementation of an Artnet Server."""
UDP_PORT = 6454
socket_server = None
ARTDMX_HEADER = b'Art-Net\x00\x00P\x00\x0e'
listeners = []
def __init__(self):
"""Initializes Art-Net server."""
# server active flag
self.listen = True
self.server_thread = Thread(target=self.__init_socket, daemon=True)
self.server_thread.start()
def __init_socket(self):
"""Initializes server socket."""
# Bind to UDP on the correct PORT
self.socket_server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket_server.setsockopt(
socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket_server.bind(('', self.UDP_PORT)) # Listen on any valid IP
while self.listen:
data, unused_address = self.socket_server.recvfrom(1024)
# only dealing with Art-Net DMX
if self.validate_header(data):
# check if this address is in any registered listener
for listener in self.listeners:
# is it the address we are listening to
if listener['address_mask'] == data[14:16]: # and self.listen
listener['buffer'] = list(data)[18:]
# check for registered callbacks
if listener['callback'] is not None:
listener['callback'](listener['buffer'], listener['universe'])
def __del__(self):
"""Graceful shutdown."""
self.listeners.clear()
self.close()
def __str__(self):
"""Printable object state."""
state = "===================================\n"
state += "Stupid Artnet Listening\n"
return state
def register_listener(self, universe=0, sub=0, net=0,
is_simplified=True, callback_function=None):
"""Adds a listener to an Art-Net Universe.
Args:
universe - Universe to listen
sub - Subnet to listen
net - Net to listen
is_simplified - Whether to use nets and subnet or universe only,
see User Guide page 5 (Universe Addressing)
callback_function - Function to call when new packet is received
Returns:
id - id of listener, used to delete listener if required
"""
listener_id = len(self.listeners)
new_listener = {
'id': listener_id,
'simplified': is_simplified,
'address_mask': make_address_mask(universe, sub, net, is_simplified),
'callback': callback_function,
'buffer': [],
'universe': universe
}
self.listeners.append(new_listener)
return listener_id
def register_multiple_listeners(self, universes: list = [0], sub=0, net=0,
is_simplified=True, callback_function=None):
"""Adds multiple listeners for multiple universes.
Args:
universes - List of universes to listen
sub - Subnet to listen
net - Net to listen
is_simplified - Whether to use nets and subnet or universe only,
see User Guide page 5 (Universe Addressing)
callback_function - Function to call when new packet is received
Returns:
listener_list - list of all used listener ids, used to delete listener if required
"""
listener_list = []
for universe in universes:
listener_list.append(self.register_listener(universe, sub, net, is_simplified, callback_function))
return listener_list
def delete_listener(self, listener_id):
"""Deletes a registered listener.
Args:
listener_id - Id of listener to delete
Returns:
None
"""
self.listeners = [
i for i in self.listeners if not i['id'] == listener_id]
def delete_all_listener(self):
"""Deletes all registered listeners.
Returns:
None
"""
self.listeners = []
def see_buffer(self, listener_id):
"""Show buffer values."""
for listener in self.listeners:
if listener.get('id') == listener_id:
return listener.get('buffer')
return "Listener not found"
def get_buffer(self, listener_id):
"""Return buffer values."""
for listener in self.listeners:
if listener.get('id') == listener_id:
return listener.get('buffer')
print("Buffer object not found")
return []
print("No Listener with given id found")
return []
def clear_buffer(self, listener_id):
"""Clear buffer in listener."""
for listener in self.listeners:
if listener.get('id') == listener_id:
listener['buffer'] = []
def set_callback(self, listener_id, callback_function):
"""Add / change callback to a given listener."""
for listener in self.listeners:
if listener.get('id') == listener_id:
listener['callback'] = callback_function
def set_address_filter(self, listener_id, universe, sub=0, net=0,
is_simplified=True):
"""Add / change filter to existing listener."""
# make mask bytes
address_mask = make_address_mask(
universe, sub, net, is_simplified)
# find listener
for listener in self.listeners:
if listener.get('id') == listener_id:
listener['simplified'] = is_simplified
listener['address_mask'] = address_mask
listener['buffer'] = []
def close(self):
"""Close UDP socket."""
self.listen = False # Set flag
self.server_thread.join() # Terminate thread once jobs are complete
@staticmethod
def validate_header(header):
"""Validates packet header as Art-Net packet.
- The packet header spells Art-Net
- The definition is for DMX Artnet (OPCode 0x50)
- The protocol version is 15
Args:
header - Packet header as bytearray
Returns:
boolean - comparison value
"""
return header[:12] == SmartNetServer.ARTDMX_HEADER